diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index d59cfe51e9..b3722018db 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -345,7 +345,7 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws // Clean up executeStatementOnDriver("drop table " + tblName, driver); } - + private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) throws Exception { HiveConf hiveConf = new HiveConf(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..ad6638178a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; +import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX; import java.io.IOException; import java.io.Serializable; @@ -27,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -40,8 +42,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -183,6 +187,25 @@ public boolean accept(Path p){ return !name.startsWith("_") && !name.startsWith("."); } }; + + public static final PathFilter acidHiddenFileFilter = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + // Don't filter out MetaDataFile.METADATA_FILE + if (name.startsWith(MetaDataFile.METADATA_FILE)) { + return true; + } + // Don't filter out OrcAcidVersion.ACID_FORMAT + if (name.startsWith(OrcAcidVersion.ACID_FORMAT)) { + return true; + } + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + + public static final String VISIBILITY_PREFIX = "_v"; public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+"); @@ -422,7 +445,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .writingBase(true); } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX, - bucketFile.getFileSystem(conf)); + bucketFile.getFileSystem(conf), null); result .setOldStyle(false) .minimumWriteId(parsedDelta.minWriteId) @@ -430,7 +453,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .bucket(bucket); } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX, - bucketFile.getFileSystem(conf)); + bucketFile.getFileSystem(conf), null); result .setOldStyle(false) .minimumWriteId(parsedDelta.minWriteId) @@ -493,6 +516,12 @@ public boolean isBaseInRawFormat() { public List getAbortedDirectories() { return abortedDirectories; } + + @Override + public String toString() { + return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: " + + original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base; + } } //This is used for (full) Acid tables. InsertOnly use NOT_ACID @@ -1029,26 +1058,26 @@ else if(statementId != parsedDelta.statementId) { public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException { String deltaDirName = deltaDir.getName(); if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { - return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs); + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs, null); } - return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix + return parsedDelta(deltaDir, DELTA_PREFIX, fs, null); // default prefix is delta_prefix } - private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs) + private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { - ParsedDelta p = parsedDelta(path, deltaPrefix, fs); + ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId); } - public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs) + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { String filename = deltaDir.getName(); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); if (filename.startsWith(deltaPrefix)) { //small optimization - delete delta can't be in raw format - boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs); + boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, dirSnapshot); // vg: cache this in dirsnapshot? return parsedDelta(deltaDir, isRawFormat); } throw new IllegalArgumentException(deltaDir + " does not start with " + @@ -1143,33 +1172,24 @@ public static Directory getAcidState(Path directory, * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(Path directory, Configuration conf, - ValidWriteIdList writeIdList, - boolean useFileIds, - boolean ignoreEmptyFiles) throws IOException { + public static Directory getAcidState(Path directory, Configuration conf, ValidWriteIdList writeIdList, + boolean useFileIds, boolean ignoreEmptyFiles) throws IOException { return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, - ValidWriteIdList writeIdList, - boolean useFileIds, - boolean ignoreEmptyFiles) throws IOException { - return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); + ValidWriteIdList writeIdList, boolean useFileIds, boolean ignoreEmptyFiles) throws IOException { + return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null, false); } - public static Directory getAcidState(Path directory, Configuration conf, - ValidWriteIdList writeIdList, - Ref useFileIds, - boolean ignoreEmptyFiles, - Map tblproperties) throws IOException { - return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties); + public static Directory getAcidState(Path directory, Configuration conf, ValidWriteIdList writeIdList, + Ref useFileIds, boolean ignoreEmptyFiles, Map tblproperties) throws IOException { + return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties, false); } - public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, - ValidWriteIdList writeIdList, - Ref useFileIds, - boolean ignoreEmptyFiles, - Map tblproperties) throws IOException { + public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf, + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, + Map tblproperties, boolean generateDirSnapshots) throws IOException { ValidTxnList validTxnList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(!Strings.isNullOrEmpty(s)) { @@ -1187,30 +1207,36 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf validTxnList.readFromString(s); } - FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem; + FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem; // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory); TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); + List dirSnapshots = null; if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { - List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); - for (FileStatus child : children) { - getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, + if (generateDirSnapshots) { + dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory); + getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + } else { + List children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter); + for (FileStatus child : children) { + getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase, + ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + } } } - // If we have a base, the original files are obsolete. if (bestBase.status != null) { // Add original files to obsolete list if any @@ -1224,13 +1250,16 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf original.clear(); originalDirectories.clear(); } else { - // Okay, we're going to need these originals. Recurse through them and figure out what we - // really need. - for (Path origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + // Okay, we're going to need these originals. + // Recurse through them and figure out what we really need. + // If we already have the original list, do nothing + // If dirSnapshots != null, we would have already populated "original" + if (dirSnapshots == null) { + for (Path origDir : originalDirectories) { + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + } } } - Collections.sort(working); //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60, @@ -1300,7 +1329,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } final Path base = bestBase.status == null ? null : bestBase.status.getPath(); - LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + + LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); /** * If this sort order is changed and there are tables that have been converted to transactional @@ -1315,10 +1344,226 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId }); // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. - final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); + final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs, null); return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base); } + + public static List getHdfsDirSnapshots(final FileSystem fs, final Path path) + throws IOException { + try { + Map dirToSnapshots = new HashMap(); + RemoteIterator itr = fs.listFiles(path, true); + while (itr.hasNext()) { + FileStatus fStatus = itr.next(); + Path fPath = fStatus.getPath(); + if (acidHiddenFileFilter.accept(fPath)) { + if (fStatus.isDirectory()) { + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus); + dirToSnapshots.put(fPath, dirSnapshot); + } + } else { + Path parentDirPath = fPath.getParent(); + FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath); + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus); + dirToSnapshots.put(parentDirPath, dirSnapshot); + } + if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) { + dirSnapshot.addMetadataFile(fStatus); + } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)){ + dirSnapshot.addOrcAcidFormatFile(fStatus); + } else { + dirSnapshot.addFile(fStatus); + } + } + } + } + return new ArrayList(dirToSnapshots.values()); + } catch (IOException e) { + e.printStackTrace(); + throw new IOException(e); + } + } + + /** + * DFS dir listing. + * Captures a dir and the corresponding list of files it contains, + * with additional properties about the dir (like isBase etc) + * + */ + public static interface HdfsDirSnapshot { + public Path getPath(); + + public void addOrcAcidFormatFile(FileStatus fStatus); + + public FileStatus getOrcAcidFormatFile(); + + public void addMetadataFile(FileStatus fStatus); + + public FileStatus getMetadataFile(FileStatus fStatus); + + // FileStatus of this HDFS directory + public FileStatus getFileStatus(); + + // Get the list of files if any within this directory + public List getFiles(); + + public void setFileStatus(FileStatus fStatus); + + public void addFile(FileStatus file); + + // File id or null + public Long getFileId(); + + public Boolean isRawFormat(); + + public void setIsRawFormat(boolean isRawFormat); + + public Boolean isBase(); + + public void setIsBase(boolean isBase); + + public Boolean isValidBase(); + + public void setIsValidBase(boolean isValidBase); + + public Boolean isCompactedBase(); + + public void setIsCompactedBase(boolean isCompactedBase); + } + + public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { + private Path dirPath; + private FileStatus fStatus; + private FileStatus metadataFStatus = null; + private FileStatus orcAcidFormatFStatus = null; + private List files = new ArrayList(); + private Long fileId = null; + private Boolean isRawFormat = null; + private Boolean isBase = null; + private Boolean isValidBase = null; + private Boolean isCompactedBase = null; + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List files) { + this.dirPath = path; + this.fStatus = fStatus; + this.files = files; + } + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) { + this.dirPath = path; + this.fStatus = fStatus; + } + + @Override + public Path getPath() { + return dirPath; + } + + @Override + public FileStatus getFileStatus() { + return fStatus; + } + + @Override + public void setFileStatus(FileStatus fStatus) { + this.fStatus = fStatus; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public void addFile(FileStatus file) { + files.add(file); + } + + @Override + public Long getFileId() { + return fileId; + } + + @Override + public Boolean isRawFormat() { + return isRawFormat; + } + + @Override + public void setIsRawFormat(boolean isRawFormat) { + this.isRawFormat = isRawFormat; + } + + @Override + public Boolean isBase() { + return isBase; + } + + @Override + public Boolean isValidBase() { + return isValidBase; + } + + @Override + public Boolean isCompactedBase() { + return isCompactedBase; + } + + @Override + public void setIsBase(boolean isBase) { + this.isBase = isBase; + } + + @Override + public void setIsValidBase(boolean isValidBase) { + this.isValidBase = isValidBase; + } + + @Override + public void setIsCompactedBase(boolean isCompactedBase) { + this.isCompactedBase = isCompactedBase; + } + + @Override + public void addOrcAcidFormatFile(FileStatus fStatus) { + this.orcAcidFormatFStatus = fStatus; + } + + @Override + public FileStatus getOrcAcidFormatFile() { + return orcAcidFormatFStatus; + } + + @Override + public void addMetadataFile(FileStatus fStatus) { + this.metadataFStatus = fStatus; + } + + @Override + public FileStatus getMetadataFile(FileStatus fStatus) { + return metadataFStatus; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Path: " + dirPath); + sb.append("; "); + sb.append("Files: { "); + for (FileStatus fstatus : files) { + sb.append(fstatus); + sb.append(", "); + } + sb.append(" }"); + return sb.toString(); + } + } + /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct @@ -1342,6 +1587,19 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write //if here, it's a result of IOW return writeIdList.isWriteIdValid(parsedBase.getWriteId()); } + + private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList, + FileSystem fs) throws IOException { + boolean isValidBase; + if (dirSnapshot.isValidBase() != null) { + isValidBase = dirSnapshot.isValidBase(); + } else { + isValidBase = isValidBase(parsedBase, writeIdList, fs); + dirSnapshot.setIsValidBase(isValidBase); + } + return isValidBase; + } + /** * Returns {@code true} if {@code parsedBase} was created by compaction. * As of Hive 4.0 we can tell if a directory is a result of compaction based on the @@ -1349,10 +1607,10 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first * since that is the cheaper test.*/ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException { - return parsedBase.getVisibilityTxnId() > 0 || - MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); + return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); } + private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, @@ -1393,7 +1651,7 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs); + ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null); if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) { return; } @@ -1413,6 +1671,66 @@ else if (writeIdList.isWriteIdRangeValid( originalDirectories.add(child.getPath()); } } + + private static void getChildState(Path candidateDirectory, List dirSnapshots, ValidWriteIdList writeIdList, + List working, List originalDirectories, List original, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, + Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { + for (HdfsDirSnapshot dirSnapshot : dirSnapshots) { + FileStatus fStat = dirSnapshot.getFileStatus(); + Path dirPath = dirSnapshot.getPath(); + String dirName = dirPath.getName(); + if (dirName.startsWith(BASE_PREFIX)) { + ParsedBase parsedBase = ParsedBase.parseBase(dirPath); + if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + final long writeId = parsedBase.getWriteId(); + if (bestBase.oldestBaseWriteId > writeId) { + //keep track for error reporting + bestBase.oldestBase = dirPath; + bestBase.oldestBaseWriteId = writeId; + } + if (bestBase.status == null) { + if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else if (bestBase.writeId < writeId) { + if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { + obsolete.add(bestBase.status.getPath()); + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else { + obsolete.add(dirPath); + } + } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) { + String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot); + if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, + delta.maxWriteId)) { + aborted.add(dirPath); + } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, + delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { + working.add(delta); + } + } else { + if (!candidateDirectory.equals(dirPath)) { + originalDirectories.add(dirPath); + } + for (FileStatus stat : dirSnapshot.getFiles()) { + if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { + original.add(createOriginalObj(null, stat)); + } + } + } + } + } + /** * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot */ @@ -2012,7 +2330,7 @@ public static String getFullTableName(String dbName, String tableName) { */ public static class MetaDataFile { //export command uses _metadata.... - private static final String METADATA_FILE = "_metadata_acid"; + public static final String METADATA_FILE = "_metadata_acid"; private static final String CURRENT_VERSION = "0"; //todo: enums? that have both field name and value list private interface Field { @@ -2062,7 +2380,7 @@ static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOExceptio * This assumes that all files in the dir are of the same type: either written by an acid * write or Load Data. This should always be the case for an Acid table. */ - private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOException { + private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOException { // vg: pass hdfsdisnapshot to avoid fs.list? if(!(baseOrDeltaDir.getName().startsWith(BASE_PREFIX) || baseOrDeltaDir.getName().startsWith(DELTA_PREFIX))) { throw new IllegalArgumentException(baseOrDeltaDir + " is not a base/delta"); @@ -2077,8 +2395,9 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce * This is only meaningful for full CRUD tables - Insert-only tables have all their data * in raw format by definition. * @param baseOrDeltaDir base or delta file. + * @param dirSnapshot */ - public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction // must be native Acid format by definition @@ -2099,13 +2418,19 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } //if here, have to check the files - Path dataFile = chooseFile(baseOrDeltaDir, fs); + Path dataFile; + if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) { + dataFile = dirSnapshot.getFiles().get(0).getPath(); + } else { + dataFile = chooseFile(baseOrDeltaDir, fs); + } if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data return false; } return isRawFormatFile(dataFile, fs); } + public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException { try { Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index 9d5ba3d310..4de5c8cff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -26,8 +26,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index cff7e04b9a..67159225a2 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -605,7 +605,7 @@ private static void processForWriteIds(Path dir, Configuration conf, } if (hasAcidDirs) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, conf, validWriteIdList, Ref.from(false), true, null); + fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false); // Find the base, created for IOW. Path base = dirInfo.getBaseDirectory(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 707e38c321..7c4e5e6351 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1273,7 +1273,7 @@ private AcidDirInfo callInternal() throws IOException { } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null); + fs, dir, context.conf, context.writeIdList, useFileIds, true, null, context.isAcid); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { @@ -2463,7 +2463,7 @@ private static boolean isStripeSatisfyPredicate( //it may also be null if there is no base - only deltas mergerOptions.baseDir(baseDirectory); if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { - isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf)); + isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf), null); mergerOptions.rootPath(baseDirectory.getParent()); } else { isOriginal = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9d631ed43d..fbb5522fbd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -126,7 +126,7 @@ private static final String TMP_PREFIX = "_tmp_space.db"; private static final String LOCAL_SESSION_PATH_KEY = "_hive.local.session.path"; private static final String HDFS_SESSION_PATH_KEY = "_hive.hdfs.session.path"; - private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space"; + public static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space"; static final String LOCK_FILE_NAME = "inuse.lck"; static final String INFO_FILE_NAME = "inuse.info"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 67a5e6de46..88420eecd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -1036,7 +1036,7 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) - && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format + && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);//deltes can't be raw format FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d4abf4277b..dddf063cca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -394,6 +394,7 @@ public void testNonAcidToAcidConversion02() throws Exception { //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); TestTxnCommands2.runWorker(hiveConf); + //TestTxnCommands2.runCleaner(hiveConf); rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); LOG.warn("after compact"); for(String s : rs) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index c5faec5e95..8506ecbbd4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -18,14 +18,16 @@ package org.apache.hadoop.hive.ql.io; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.BitSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -246,13 +248,17 @@ public void testBaseDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(5, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(5, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(1, deltas.size()); @@ -276,9 +282,9 @@ public void testObsoleteOriginals() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":")); // Obsolete list should include the two original bucket files, and the old base dir - List obsolete = dir.getObsolete(); - assertEquals(3, obsolete.size()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(3, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } @@ -451,15 +457,19 @@ public void testBaseWithDeleteDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(7, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString()); - assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(7, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_029_029")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(2, deltas.size()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b5958fa9cc..e32d2cde77 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2812,10 +2812,10 @@ public void testSplitGenReadOps() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable - 3 calls due to recursive listing // call-2: open - mock:/mocktable/0_0 // call-3: open - mock:/mocktable/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); assertEquals(2, splits.length); // revert back to local fs @@ -2870,10 +2870,10 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl - 3 calls due to recursive listing // call-2: open - mock:/mocktbl/0_0 // call-3: open - mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); // force BI to avoid reading footers conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); @@ -2890,8 +2890,8 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl - assertEquals(1, readOpsDelta); + // call-1: getAcidState - mock:/mocktbl - 3 calls due to recursive listing + assertEquals(3, readOpsDelta); // enable cache and use default strategy conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); @@ -2909,10 +2909,10 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl + // call-1: getAcidState - mock:/mocktbl - 3 calls due to recursive listing // call-2: open - mock:/mocktbl/0_0 // call-3: open - mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -2927,8 +2927,8 @@ public void testSplitGenReadOpsLocalCache() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl - assertEquals(1, readOpsDelta); + // call-1: getAcidState - mock:/mocktbl - 3 calls due to recursive listing + assertEquals(3, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -2981,10 +2981,10 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable - 3 calls due to recursive listing // call-2: open - mock:/mocktbl1/0_0 // call-3: open - mock:/mocktbl1/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); // change file length and look for cache misses @@ -3020,10 +3020,10 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktable + // call-1: getAcidState - mock:/mocktable - 3 calls due to recursive listing // call-2: open - mock:/mocktbl1/0_0 // call-3: open - mock:/mocktbl1/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3038,8 +3038,8 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl1 - assertEquals(1, readOpsDelta); + // call-1: getAcidState - mock:/mocktbl1 - 3 calls due to recursive listing + assertEquals(3, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3093,10 +3093,10 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 - 3 calls due to recursive listing // call-2: open - mock:/mocktbl2/0_0 // call-3: open - mock:/mocktbl2/0_1 - assertEquals(3, readOpsDelta); + assertEquals(5, readOpsDelta); // change file modification time and look for cache misses FileSystem fs1 = FileSystem.get(conf); @@ -3115,9 +3115,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 - 3 calls due to recursive listing // call-2: open - mock:/mocktbl2/0_1 - assertEquals(2, readOpsDelta); + assertEquals(4, readOpsDelta); // touch the next file fs1 = FileSystem.get(conf); @@ -3136,9 +3136,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 + // call-1: getAcidState - mock:/mocktbl2 - 3 calls due to recursive listing // call-2: open - mock:/mocktbl2/0_0 - assertEquals(2, readOpsDelta); + assertEquals(4, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3153,8 +3153,8 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: listLocatedStatus - mock:/mocktbl2 - assertEquals(1, readOpsDelta); + // call-1: getAcidState - mock:/mocktbl2 - 3 calls due to recursive listing + assertEquals(3, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3519,13 +3519,13 @@ public void testACIDReaderNoFooterSerialize() throws Exception { } // call-1: open to read footer - split 1 => mock:/mocktable5/0_0 // call-2: open to read data - split 1 => mock:/mocktable5/0_0 - // call-3: getAcidState - split 1 => mock:/mocktable5 (to compute offset for original read) + // call-3: getAcidState - split 1 => mock:/mocktable5 (to compute offset for original read) - 3 read ops due to recursive listing // call-4: open to read footer - split 2 => mock:/mocktable5/0_1 // call-5: open to read data - split 2 => mock:/mocktable5/0_1 - // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read) + // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read) - 3 read ops due to recursive listing // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count) // call-8: file status - split 2 => mock:/mocktable5/0_0 - assertEquals(8, readOpsDelta); + assertEquals(12, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3599,12 +3599,12 @@ public void testACIDReaderFooterSerialize() throws Exception { } } // call-1: open to read data - split 1 => mock:/mocktable6/0_0 - // call-2: AcidUtils.getAcidState - split 1 => ls mock:/mocktable6 + // call-2: AcidUtils.getAcidState - split 1 => ls mock:/mocktable6 - 3 read ops due to recursive listing // call-3: open to read data - split 2 => mock:/mocktable6/0_1 - // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6 + // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6 - 3 read ops due to recursive listing // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file) // call-6: file stat - split 2 => mock:/mocktable6/0_0 - assertEquals(6, readOpsDelta); + assertEquals(10, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3679,7 +3679,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(7, readOpsDelta); + assertEquals(10, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3753,7 +3753,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(5, readOpsDelta); + assertEquals(8, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///");