diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 4dc04f46fd..bc67d03078 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -564,7 +564,7 @@ public void testTableValidation() throws Exception { private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidWriteIdList writeIds = getTransactionContext(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -617,7 +617,8 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, + false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -667,7 +668,8 @@ private ValidWriteIdList getTransactionContext(Configuration conf) throws Except return TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); } private void checkNothingWritten(Path partitionPath) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, + false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1250,7 +1252,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1275,7 +1277,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); + dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 78cae7263b..86f762e97c 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -96,7 +96,7 @@ public StreamingAssert newStreamingAssert(Table table, List partition) t writeIds = TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); partitionLocation = getPartitionLocation(); - dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds); + dir = AcidUtils.getAcidState(null, partitionLocation, conf, writeIds, null, false, null, true); assertEquals(0, dir.getObsolete().size()); assertEquals(0, dir.getOriginalFiles().size()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..f207bf2fdb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; +import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX; import java.io.IOException; import java.io.Serializable; @@ -27,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -40,8 +42,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -183,6 +187,39 @@ public boolean accept(Path p){ return !name.startsWith("_") && !name.startsWith("."); } }; + + public static final PathFilter acidHiddenFileFilter = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + // Don't filter out MetaDataFile.METADATA_FILE + if (name.startsWith(MetaDataFile.METADATA_FILE)) { + return true; + } + // Don't filter out OrcAcidVersion.ACID_FORMAT + if (name.startsWith(OrcAcidVersion.ACID_FORMAT)) { + return true; + } + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + public static final PathFilter acidTempDirFilter = new PathFilter() { + @Override + public boolean accept(Path dirPath) { + String dirPathStr = dirPath.toString(); + // We don't want to filter out temp tables + if (dirPathStr.contains(SessionState.TMP_PREFIX)) { + return true; + } + if ((dirPathStr.contains("/.")) || (dirPathStr.contains("/_"))) { + return false; + } else { + return true; + } + } + }; + public static final String VISIBILITY_PREFIX = "_v"; public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+"); @@ -422,7 +459,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .writingBase(true); } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX, - bucketFile.getFileSystem(conf)); + bucketFile.getFileSystem(conf), null); result .setOldStyle(false) .minimumWriteId(parsedDelta.minWriteId) @@ -430,7 +467,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .bucket(bucket); } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX, - bucketFile.getFileSystem(conf)); + bucketFile.getFileSystem(conf), null); result .setOldStyle(false) .minimumWriteId(parsedDelta.minWriteId) @@ -493,6 +530,12 @@ public boolean isBaseInRawFormat() { public List getAbortedDirectories() { return abortedDirectories; } + + @Override + public String toString() { + return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: " + + original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base; + } } //This is used for (full) Acid tables. InsertOnly use NOT_ACID @@ -1029,26 +1072,26 @@ else if(statementId != parsedDelta.statementId) { public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException { String deltaDirName = deltaDir.getName(); if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { - return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs); + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs, null); } - return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix + return parsedDelta(deltaDir, DELTA_PREFIX, fs, null); // default prefix is delta_prefix } - private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs) + private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { - ParsedDelta p = parsedDelta(path, deltaPrefix, fs); + ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId); } - public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs) + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { String filename = deltaDir.getName(); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); if (filename.startsWith(deltaPrefix)) { //small optimization - delete delta can't be in raw format - boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs); + boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, dirSnapshot); return parsedDelta(deltaDir, isRawFormat); } throw new IllegalArgumentException(deltaDir + " does not start with " + @@ -1117,19 +1160,13 @@ public static boolean isAcid(FileSystem fileSystem, Path directory, return false; } - @VisibleForTesting - public static Directory getAcidState(Path directory, - Configuration conf, - ValidWriteIdList writeIdList) throws IOException { - return getAcidState(directory, conf, writeIdList, false, false); - } - /** State class for getChildState; cannot modify 2 things in a method. */ private static class TxnBase { private FileStatus status; private long writeId = 0; private long oldestBaseWriteId = Long.MAX_VALUE; private Path oldestBase = null; + private HdfsDirSnapshot dirSnapShot; } /** @@ -1143,33 +1180,10 @@ public static Directory getAcidState(Path directory, * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(Path directory, Configuration conf, - ValidWriteIdList writeIdList, - boolean useFileIds, - boolean ignoreEmptyFiles) throws IOException { - return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); - } - - public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, - ValidWriteIdList writeIdList, - boolean useFileIds, - boolean ignoreEmptyFiles) throws IOException { - return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); - } - - public static Directory getAcidState(Path directory, Configuration conf, - ValidWriteIdList writeIdList, - Ref useFileIds, - boolean ignoreEmptyFiles, - Map tblproperties) throws IOException { - return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties); - } - - public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, - ValidWriteIdList writeIdList, - Ref useFileIds, - boolean ignoreEmptyFiles, - Map tblproperties) throws IOException { + @VisibleForTesting + public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf, + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, + Map tblproperties, boolean generateDirSnapshots) throws IOException { ValidTxnList validTxnList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(!Strings.isNullOrEmpty(s)) { @@ -1187,30 +1201,36 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf validTxnList.readFromString(s); } - FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem; + FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem; // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory); TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); + List dirSnapshots = null; if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { - List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); - for (FileStatus child : children) { - getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, + if (generateDirSnapshots) { + dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory); + getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + } else { + List children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter); + for (FileStatus child : children) { + getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase, + ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + } } } - // If we have a base, the original files are obsolete. if (bestBase.status != null) { // Add original files to obsolete list if any @@ -1224,13 +1244,16 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf original.clear(); originalDirectories.clear(); } else { - // Okay, we're going to need these originals. Recurse through them and figure out what we - // really need. - for (Path origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + // Okay, we're going to need these originals. + // Recurse through them and figure out what we really need. + // If we already have the original list, do nothing + // If dirSnapshots != null, we would have already populated "original" + if (dirSnapshots == null) { + for (Path origDir : originalDirectories) { + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + } } } - Collections.sort(working); //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60, @@ -1299,8 +1322,20 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId minOpenWriteId, bestBase.oldestBase.toString())); } - final Path base = bestBase.status == null ? null : bestBase.status.getPath(); - LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + + Path base = null; + boolean isBaseInRawFormat = false; + if (bestBase.status != null) { + base = bestBase.status.getPath(); + isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, null); + if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) { + for (FileStatus stat : bestBase.dirSnapShot.getFiles()) { + if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { + original.add(createOriginalObj(null, stat)); + } + } + } + } + LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); /** * If this sort order is changed and there are tables that have been converted to transactional @@ -1313,12 +1348,228 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId //this does "Path.uri.compareTo(that.uri)" return o1.getFileStatus().compareTo(o2.getFileStatus()); }); - - // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. - final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base); } + + public static List getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException { + try { + Map dirToSnapshots = new HashMap(); + RemoteIterator itr = fs.listFiles(path, true); + while (itr.hasNext()) { + FileStatus fStatus = itr.next(); + Path fPath = fStatus.getPath(); + if (acidHiddenFileFilter.accept(fPath)) { + if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) { + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus); + dirToSnapshots.put(fPath, dirSnapshot); + } + } else { + Path parentDirPath = fPath.getParent(); + if (acidTempDirFilter.accept(parentDirPath)) { + FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath); + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus); + dirToSnapshots.put(parentDirPath, dirSnapshot); + } + // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot + // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task + if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) { + dirSnapshot.addMetadataFile(fStatus); + } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) { + dirSnapshot.addOrcAcidFormatFile(fStatus); + } else { + dirSnapshot.addFile(fStatus); + } + } + } + } + } + return new ArrayList(dirToSnapshots.values()); + } catch (IOException e) { + e.printStackTrace(); + throw new IOException(e); + } + } + + /** + * DFS dir listing. + * Captures a dir and the corresponding list of files it contains, + * with additional properties about the dir (like isBase etc) + * + */ + public static interface HdfsDirSnapshot { + public Path getPath(); + + public void addOrcAcidFormatFile(FileStatus fStatus); + + public FileStatus getOrcAcidFormatFile(); + + public void addMetadataFile(FileStatus fStatus); + + public FileStatus getMetadataFile(FileStatus fStatus); + + // FileStatus of this HDFS directory + public FileStatus getFileStatus(); + + // Get the list of files if any within this directory + public List getFiles(); + + public void setFileStatus(FileStatus fStatus); + + public void addFile(FileStatus file); + + // File id or null + public Long getFileId(); + + public Boolean isRawFormat(); + + public void setIsRawFormat(boolean isRawFormat); + + public Boolean isBase(); + + public void setIsBase(boolean isBase); + + public Boolean isValidBase(); + + public void setIsValidBase(boolean isValidBase); + + public Boolean isCompactedBase(); + + public void setIsCompactedBase(boolean isCompactedBase); + } + + public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { + private Path dirPath; + private FileStatus fStatus; + private FileStatus metadataFStatus = null; + private FileStatus orcAcidFormatFStatus = null; + private List files = new ArrayList(); + private Long fileId = null; + private Boolean isRawFormat = null; + private Boolean isBase = null; + private Boolean isValidBase = null; + private Boolean isCompactedBase = null; + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List files) { + this.dirPath = path; + this.fStatus = fStatus; + this.files = files; + } + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) { + this.dirPath = path; + this.fStatus = fStatus; + } + + @Override + public Path getPath() { + return dirPath; + } + + @Override + public FileStatus getFileStatus() { + return fStatus; + } + + @Override + public void setFileStatus(FileStatus fStatus) { + this.fStatus = fStatus; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public void addFile(FileStatus file) { + files.add(file); + } + + @Override + public Long getFileId() { + return fileId; + } + + @Override + public Boolean isRawFormat() { + return isRawFormat; + } + + @Override + public void setIsRawFormat(boolean isRawFormat) { + this.isRawFormat = isRawFormat; + } + + @Override + public Boolean isBase() { + return isBase; + } + + @Override + public Boolean isValidBase() { + return isValidBase; + } + + @Override + public Boolean isCompactedBase() { + return isCompactedBase; + } + + @Override + public void setIsBase(boolean isBase) { + this.isBase = isBase; + } + + @Override + public void setIsValidBase(boolean isValidBase) { + this.isValidBase = isValidBase; + } + + @Override + public void setIsCompactedBase(boolean isCompactedBase) { + this.isCompactedBase = isCompactedBase; + } + + @Override + public void addOrcAcidFormatFile(FileStatus fStatus) { + this.orcAcidFormatFStatus = fStatus; + } + + @Override + public FileStatus getOrcAcidFormatFile() { + return orcAcidFormatFStatus; + } + + @Override + public void addMetadataFile(FileStatus fStatus) { + this.metadataFStatus = fStatus; + } + + @Override + public FileStatus getMetadataFile(FileStatus fStatus) { + return metadataFStatus; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Path: " + dirPath); + sb.append("; "); + sb.append("Files: { "); + for (FileStatus fstatus : files) { + sb.append(fstatus); + sb.append(", "); + } + sb.append(" }"); + return sb.toString(); + } + } + /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct @@ -1342,6 +1593,19 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write //if here, it's a result of IOW return writeIdList.isWriteIdValid(parsedBase.getWriteId()); } + + private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList, + FileSystem fs) throws IOException { + boolean isValidBase; + if (dirSnapshot.isValidBase() != null) { + isValidBase = dirSnapshot.isValidBase(); + } else { + isValidBase = isValidBase(parsedBase, writeIdList, fs); + dirSnapshot.setIsValidBase(isValidBase); + } + return isValidBase; + } + /** * Returns {@code true} if {@code parsedBase} was created by compaction. * As of Hive 4.0 we can tell if a directory is a result of compaction based on the @@ -1349,10 +1613,10 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first * since that is the cheaper test.*/ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException { - return parsedBase.getVisibilityTxnId() > 0 || - MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); + return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); } + private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, @@ -1393,7 +1657,7 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs); + ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null); if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) { return; } @@ -1413,6 +1677,75 @@ else if (writeIdList.isWriteIdRangeValid( originalDirectories.add(child.getPath()); } } + + private static void getChildState(Path candidateDirectory, List dirSnapshots, ValidWriteIdList writeIdList, + List working, List originalDirectories, List original, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, + Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { + for (HdfsDirSnapshot dirSnapshot : dirSnapshots) { + FileStatus fStat = dirSnapshot.getFileStatus(); + Path dirPath = dirSnapshot.getPath(); + String dirName = dirPath.getName(); + if (dirName.startsWith(BASE_PREFIX)) { + bestBase.dirSnapShot = dirSnapshot; + ParsedBase parsedBase = ParsedBase.parseBase(dirPath); + if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + continue; + } + final long writeId = parsedBase.getWriteId(); + if (bestBase.oldestBaseWriteId > writeId) { + //keep track for error reporting + bestBase.oldestBase = dirPath; + bestBase.oldestBaseWriteId = writeId; + } + if (bestBase.status == null) { + if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else if (bestBase.writeId < writeId) { + if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { + obsolete.add(bestBase.status.getPath()); + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else { + obsolete.add(dirPath); + } + } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) { + String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot); + if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) { + continue; + } + if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, + delta.maxWriteId)) { + aborted.add(dirPath); + } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, + delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { + if (delta.isRawFormat) { + for (FileStatus stat : dirSnapshot.getFiles()) { + if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { + original.add(createOriginalObj(null, stat)); + } + } + } else { + working.add(delta); + } + } + } else { + if (!candidateDirectory.equals(dirPath)) { + originalDirectories.add(dirPath); + } + for (FileStatus stat : dirSnapshot.getFiles()) { + if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { + original.add(createOriginalObj(null, stat)); + } + } + } + } + } + /** * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot */ @@ -1494,10 +1827,13 @@ public static void findOriginals(FileSystem fs, Path dir, } } - private static List tryListLocatedHdfsStatus(Ref useFileIds, - FileSystem fs, Path directory) { - Boolean val = useFileIds.value; + private static List tryListLocatedHdfsStatus(Ref useFileIds, FileSystem fs, + Path directory) { List childrenWithId = null; + if (useFileIds == null) { + return childrenWithId; + } + Boolean val = useFileIds.value; if (val == null || val) { try { childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); @@ -2012,7 +2348,7 @@ public static String getFullTableName(String dbName, String tableName) { */ public static class MetaDataFile { //export command uses _metadata.... - private static final String METADATA_FILE = "_metadata_acid"; + public static final String METADATA_FILE = "_metadata_acid"; private static final String CURRENT_VERSION = "0"; //todo: enums? that have both field name and value list private interface Field { @@ -2077,8 +2413,9 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce * This is only meaningful for full CRUD tables - Insert-only tables have all their data * in raw format by definition. * @param baseOrDeltaDir base or delta file. + * @param dirSnapshot */ - public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction // must be native Acid format by definition @@ -2099,13 +2436,19 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } //if here, have to check the files - Path dataFile = chooseFile(baseOrDeltaDir, fs); + Path dataFile; + if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) { + dataFile = dirSnapshot.getFiles().get(0).getPath(); + } else { + dataFile = chooseFile(baseOrDeltaDir, fs); + } if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data return false; } return isRawFormatFile(dataFile, fs); } + public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException { try { Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf())); @@ -2227,7 +2570,7 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) + " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); return null; } - Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); + Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true); // Assume that for an MM table, or if there's only the base directory, we are good. if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { Utilities.FILE_OP_LOGGER.warn( @@ -2264,7 +2607,8 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList. ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr); - Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList); + Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null, + false, null, false); for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { pathList.add(hfs.getFileStatus().getPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index 9d5ba3d310..4de5c8cff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -26,8 +26,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index cff7e04b9a..7e71c7719a 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -568,12 +568,12 @@ public static void processPathsForMmRead(List dirs, Configuration conf, } boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS); for (Path dir : dirs) { - processForWriteIds( + processForWriteIdsForMmRead( dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); } } - private static void processForWriteIds(Path dir, Configuration conf, + private static void processForWriteIdsForMmRead(Path dir, Configuration conf, ValidWriteIdList validWriteIdList, boolean allowOriginals, List finalPaths, List pathsWithFileOriginals) throws IOException { FileSystem fs = dir.getFileSystem(conf); @@ -605,7 +605,7 @@ private static void processForWriteIds(Path dir, Configuration conf, } if (hasAcidDirs) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, conf, validWriteIdList, Ref.from(false), true, null); + fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false); // Find the base, created for IOW. Path base = dirInfo.getBaseDirectory(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 707e38c321..ec4994d4d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1273,7 +1273,7 @@ private AcidDirInfo callInternal() throws IOException { } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null); + fs, dir, context.conf, context.writeIdList, useFileIds, true, null, false); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { @@ -2463,7 +2463,7 @@ private static boolean isStripeSatisfyPredicate( //it may also be null if there is no base - only deltas mergerOptions.baseDir(baseDirectory); if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { - isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf)); + isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf), null); mergerOptions.rootPath(baseDirectory.getParent()); } else { isOriginal = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index b1ede0556f..2ac6232460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hive.common.util.Ref; import com.google.common.annotations.VisibleForTesting; @@ -462,9 +463,8 @@ static int encodeBucketId(Configuration conf, int bucketId, int statementId) { * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} */ //the split is from something other than the 1st file of the logical bucket - compute offset - AcidUtils.Directory directoryState - = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, - true); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, + validWriteIdList, Ref.from(false), true, null, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { @@ -577,8 +577,8 @@ public void next(OrcStruct next) throws IOException { //when compacting each split needs to process the whole logical bucket assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; - AcidUtils.Directory directoryState - = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, + validWriteIdList, Ref.from(false), true, null, true); /** * Note that for reading base_x/ or delta_x_x/ with non-acid schema, * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 15f1f945ce..374b1058e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -46,6 +46,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.common.util.Ref; import org.apache.orc.ColumnStatistics; import org.apache.orc.IntegerColumnStatistics; import org.apache.orc.OrcConf; @@ -722,8 +723,8 @@ dropped by the Reader (I guess because of orc.impl.SchemaEvolution) int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); - AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf, - validWriteIdList, false, true); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf, + validWriteIdList, Ref.from(false), true, null, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9d631ed43d..463236135c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -123,7 +123,7 @@ public class SessionState { private static final Logger LOG = LoggerFactory.getLogger(SessionState.class); - private static final String TMP_PREFIX = "_tmp_space.db"; + public static final String TMP_PREFIX = "_tmp_space.db"; private static final String LOCAL_SESSION_PATH_KEY = "_hive.local.session.path"; private static final String HDFS_SESSION_PATH_KEY = "_hive.hdfs.session.path"; private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 57eb506996..5dfa7ca974 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -225,7 +226,8 @@ private static String idWatermark(CompactionInfo ci) { private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) throws IOException, NoSuchObjectException { Path locPath = new Path(location); - AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); + AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from( + false), false, null, false); List obsoleteDirs = dir.getObsolete(); /** * add anything in 'dir' that only has data from aborted transactions - no one should be diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 67a5e6de46..7c79d7ba18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -267,8 +267,8 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState( - new Path(sd.getLocation()), conf, writeIds, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true, + null, false); if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { return; @@ -298,7 +298,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor maxDeltasToHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas - dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); + dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false); } StringableList dirsToSearch = new StringableList(); @@ -341,9 +341,8 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, - Ref.from(false), false, - t.getParameters()); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from( + false), false, t.getParameters(), false); if (!isEnoughToCompact(dir, sd)) { return; @@ -446,8 +445,8 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " + t.getDbName() + "." + t.getTableName()); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), - conf, writeIds, Ref.from(false), false, t.getParameters()); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), + false, t.getParameters(), false); removeFilesForMmTable(conf, dir); // Then, actually do the compaction. @@ -1036,7 +1035,7 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) - && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format + && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);//deltes can't be raw format FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 6168fc0f79..5fb255272c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,7 +292,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, false, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false); Path base = dir.getBaseDirectory(); long baseSize = 0; FileStatus stat = null; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d4abf4277b..dddf063cca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -394,6 +394,7 @@ public void testNonAcidToAcidConversion02() throws Exception { //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); TestTxnCommands2.runWorker(hiveConf); + //TestTxnCommands2.runCleaner(hiveConf); rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); LOG.warn("after compact"); for(String s : rs) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index ea31557741..e4f0529761 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -18,14 +18,16 @@ package org.apache.hadoop.hive.ql.io; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.BitSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -161,9 +163,8 @@ public void testOriginal() throws Exception { new MockFile("mock:/tbl/part1/random", 500, new byte[0]), new MockFile("mock:/tbl/part1/_done", 0, new byte[0]), new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0])); - AcidUtils.Directory dir = - AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf, - new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); @@ -198,9 +199,8 @@ public void testOriginalDeltas() throws Exception { new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0])); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(new MockPath(fs, - "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals(null, dir.getBaseDirectory()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -242,17 +242,20 @@ public void testBaseDeltas() throws Exception { new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0])); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(new MockPath(fs, - "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(5, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(5, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(1, deltas.size()); @@ -273,12 +276,12 @@ public void testObsoleteOriginals() throws Exception { Path part = new MockPath(fs, "/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:" + + Long.MAX_VALUE + ":"), null, false, null, false); // Obsolete list should include the two original bucket files, and the old base dir - List obsolete = dir.getObsolete(); - assertEquals(3, obsolete.size()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(3, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } @@ -296,8 +299,8 @@ public void testOverlapingDelta() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" + + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -333,8 +336,8 @@ public void testOverlapingDelta2() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir - = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" + + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); @@ -362,7 +365,8 @@ public void deltasWithOpenTxnInRead() throws Exception { //hypothetically, txn 50 is open and writing write ID 4 conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); - AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, + false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -386,7 +390,8 @@ public void deltasWithOpenTxnInRead2() throws Exception { //hypothetically, txn 50 is open and writing write ID 4 conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); - AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, + false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -402,8 +407,8 @@ public void deltasWithOpenTxnsNotInCompact() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE)); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:" + + Long.MAX_VALUE), null, false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -421,8 +426,8 @@ public void deltasWithOpenTxnsNotInCompact2() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:3:" + Long.MAX_VALUE)); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:" + + Long.MAX_VALUE), null, false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -447,19 +452,22 @@ public void testBaseWithDeleteDeltas() throws Exception { new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0])); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(new MockPath(fs, - "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(7, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString()); - assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(7, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_029_029")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(2, deltas.size()); @@ -487,8 +495,8 @@ public void testOverlapingDeltaAndDeleteDelta() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" + + Long.MAX_VALUE + ":"), null, false, null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(3, obsolete.size()); @@ -518,8 +526,8 @@ public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exc Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" + + Long.MAX_VALUE + ":"), null, false, null, false); List obsolete = dir.getObsolete(); assertEquals(1, obsolete.size()); assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString()); @@ -547,8 +555,8 @@ public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception { Path part = new MockPath(fs, "mock:/tbl/part1"); conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE + ":")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:" + + Long.MAX_VALUE + ":"), null, false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -572,7 +580,8 @@ public void deleteDeltasWithOpenTxnInRead() throws Exception { //hypothetically, txn 50 is open and writing write ID 4 conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); - AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4")); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, + false, null, false); List delts = dir.getCurrentDirectories(); assertEquals(3, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b5958fa9cc..97a18d0d95 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -748,13 +748,13 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { try { MockFileSystem fs = new MockFileSystem(conf); MockFileSystem.addGlobalFile( - new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( - new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( - new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + new MockFile("mock:/a/base_0000001/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( - new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + new MockFile("mock:/a/base_0000001/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( @@ -763,6 +763,14 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); MockFileSystem.addGlobalFile( new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); conf.set("bucket_count", "4"); //set up props for read @@ -803,7 +811,7 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { System.out.println("STATS TRACE END - " + testCaseName.getMethodName()); int delta = readsAfter - readsBefore; //HIVE-16812 adds 1 read of the footer of each file (only if delete delta exists) - assertEquals(8, delta); + assertEquals(12, delta); } finally { MockFileSystem.clearGlobalFiles(); } @@ -2812,7 +2820,7 @@ public void testSplitGenReadOps() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable // call-2: open - mock:/mocktable/0_0 // call-3: open - mock:/mocktable/0_1 assertEquals(3, readOpsDelta); @@ -2870,7 +2878,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl // call-2: open - mock:/mocktbl/0_0 // call-3: open - mock:/mocktbl/0_1 assertEquals(3, readOpsDelta); @@ -2890,7 +2898,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl assertEquals(1, readOpsDelta); // enable cache and use default strategy @@ -2909,7 +2917,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl // call-2: open - mock:/mocktbl/0_0 // call-3: open - mock:/mocktbl/0_1 assertEquals(3, readOpsDelta); @@ -2927,7 +2935,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl assertEquals(1, readOpsDelta); // revert back to local fs @@ -2981,7 +2989,7 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable // call-2: open - mock:/mocktbl1/0_0 // call-3: open - mock:/mocktbl1/0_1 assertEquals(3, readOpsDelta); @@ -3020,7 +3028,7 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable // call-2: open - mock:/mocktbl1/0_0 // call-3: open - mock:/mocktbl1/0_1 assertEquals(3, readOpsDelta); @@ -3038,7 +3046,7 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl1 + // call-1: getAcidState - mock:/mocktbl1 assertEquals(1, readOpsDelta); // revert back to local fs @@ -3093,7 +3101,7 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 // call-2: open - mock:/mocktbl2/0_0 // call-3: open - mock:/mocktbl2/0_1 assertEquals(3, readOpsDelta); @@ -3115,7 +3123,7 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 // call-2: open - mock:/mocktbl2/0_1 assertEquals(2, readOpsDelta); @@ -3136,7 +3144,7 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 // call-2: open - mock:/mocktbl2/0_0 assertEquals(2, readOpsDelta); @@ -3153,7 +3161,7 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 assertEquals(1, readOpsDelta); // revert back to local fs @@ -3525,7 +3533,7 @@ public void testACIDReaderNoFooterSerialize() throws Exception { // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read) // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count) // call-8: file status - split 2 => mock:/mocktable5/0_0 - assertEquals(8, readOpsDelta); + assertEquals(12, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3604,7 +3612,7 @@ public void testACIDReaderFooterSerialize() throws Exception { // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6 // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file) // call-6: file stat - split 2 => mock:/mocktable6/0_0 - assertEquals(6, readOpsDelta); + assertEquals(10, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3679,7 +3687,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(7, readOpsDelta); + assertEquals(12, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3753,7 +3761,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(5, readOpsDelta); + assertEquals(10, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 8451462023..d4fd7732cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -595,7 +595,7 @@ public void testEmpty() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE); - AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList); + AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false); Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), BUCKET); @@ -666,7 +666,8 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE); - AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList); + AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null, + use130Format); assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); assertEquals(new Path(root, use130Format ? diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index c6d7e7f27c..dbff263aed 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -945,7 +945,7 @@ public void testTableValidation() throws Exception { private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidWriteIdList writeIds = getTransactionContext(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -999,7 +999,7 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1050,7 +1050,7 @@ private ValidWriteIdList getTransactionContext(Configuration conf) throws Except return TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); } private void checkNothingWritten(Path partitionPath) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1947,7 +1947,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1972,7 +1972,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); + dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());